/**
 * FileName: StructuredStreaming
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2018/11/30 11:18
 * Description: 采用Structured Streaming进行数据采集
 * stream join stream 方式
 */
package cn.com.bonc.app;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import cn.com.bonc.util.StructStreamingUtil;
import cn.com.bonc.dynamic.OrderedProperties;
import javassist.*;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class StructuredStreaming10 {

	/**
     * 尝试采用Broadcast广播读取的变量值以生成动态类
	 * @param args
	 */
	public static void main(String[] args) throws IOException {

		SparkSession spark = SparkSession.builder()
				.appName("StructuredStreamingTest")
				.getOrCreate();

		Dataset<Row> lines = spark
				.readStream()
				.format("kafka")
				.option("kafka.bootstrap.servers", ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS))
			.option("subscribe", "dgtopic")
				.option("startingOffsets", "earliest")
                .load();

		Dataset<String> dataset=lines.selectExpr("CAST(value AS STRING)").as(Encoders.STRING());

		System.out.println("===============================>dynamic-bean2.properties 10");
		//读取外置文件并进行广播
		//1.读外部配置文件
		Properties properties = new OrderedProperties();
		properties.load(new FileInputStream("dynamic-bean2.properties"));
		List<String>propertyKeyList =new ArrayList<>();
		properties.keySet().stream().forEach( x->propertyKeyList.add(x.toString()));

		System.out.println("===============================>propertyKeyList println:");
		propertyKeyList.stream().forEach(System.out::println);

		//2.广播属性
		JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
		Broadcast<List<String>> listBroadcast = jsc.broadcast(propertyKeyList);

		//读取广播并生成类
		Class dynamicClass=null;
		try {
			dynamicClass = getDynamicClass(propertyKeyList);
		} catch (CannotCompileException e) {
			e.printStackTrace();
		} catch (NotFoundException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		}

		Dataset rowDataset = dataset
				.filter((FilterFunction<String>) x -> StructStreamingUtil.isStandardData(x))
				.map((MapFunction<String, Object>) x -> {
							List<String> valueList = Arrays.asList(x.split("[|]"));

							List<String> listBroadcastValue = listBroadcast.getValue();
							Class dynamicClazz = getDynamicClass(listBroadcastValue);
							Object dynamicObject = dynamicClazz.newInstance();
							int index = 0;
							for (String prop : listBroadcastValue) {
								Method setter = dynamicObject.getClass().getMethod("set" + upperFirst(prop), String.class);
								setter.invoke(dynamicObject, valueList.get(index++));
							}
							return dynamicObject;
						},
						Encoders.bean(dynamicClass))
				.toDF();

		System.out.println("===============================>10 rowDataset.printSchema();");
		rowDataset.printSchema();

		// 运行查询并出输出到控制台
		StreamingQuery query = rowDataset
				.writeStream()
				.format("console")
				.outputMode(OutputMode.Append())
				.start();
		try {
			query.awaitTermination();
		} catch (StreamingQueryException e) {
			e.printStackTrace();
		}
	}

	private static Class getDynamicClass(List<String> propertyKeyList) throws IOException, CannotCompileException, NotFoundException, ClassNotFoundException {

		if (isPresent("cn.com.bonc.domain.DynamicBean")){
			System.out.println("===============================>class for name get");
			return Class.forName("cn.com.bonc.domain.DynamicBean");

		}

		ClassPool pool = ClassPool.getDefault();
		CtClass ctClass =pool.makeClass("cn.com.bonc.domain.DynamicBean");
		for (String prop:propertyKeyList){

			CtField ctField = CtField.make("private String "+prop+";", ctClass);
			ctClass.addField(ctField);

			String getMethodString ="public String get"+upperFirst(prop)+"() { return this."+prop+";}";
			String setMethodString ="public void set"+upperFirst(prop)+"(String "+prop+") { this."+prop+" = "+prop+";}";

			CtMethod getMethod = CtNewMethod.make(getMethodString, ctClass);
			CtMethod setMethod = CtNewMethod.make(setMethodString, ctClass);
			ctClass.addMethod(getMethod);
			ctClass.addMethod(setMethod);
		}
		return ctClass.toClass();
	}

	private static String upperFirst(String toUpper) {
		return toUpper.substring(0, 1).toUpperCase()+ toUpper.substring(1);
	}

	public static boolean isPresent(String name) {
		try {
			Thread.currentThread().getContextClassLoader().loadClass(name);
			return true;
		} catch (ClassNotFoundException e) {
			return false;
		}
	}
}
